众所周知,Apache Kylin 是基于星型模型设计的,并不支持雪花模型(更新:在最近发布的2.0版本中增加了对雪花模型的支持)。然而在大多数情况下,Kylin的存储引擎是Hive。不同于普通RDMS,Hive支持复杂数据类型,包括Array,Map和Struct。复杂数据类型的字段又可以称为内嵌对象(nested object), 某种程度上属于雪花模型。如果导入带有复杂数据类型字段的Hive表,Kylin会在校验表元信息的时候报错。
这意味着我们需要先对该表做平铺(flatten)处理,再将结果导入Kylin。此时就会用到Hive的UDTF。UDTF(User Defined Table Generating Functions)用于将一行输入转化为多行或者多列,将一个map字段分拆为多行或者多列自然不在话下。
根据官方文档,Hive内置了explode()函数可以拆分Map字段。不过explode()是按列拆,也就是拆成key和value两行,而我们希望的是将Map每个Key对应的属性拆为一列,这样我们就可以在Kylin中直接根据列名来建模。因此我们需要自定义UDTF来完成平铺,另外由于这个函数涉及列名,所以是和Map字段紧耦合的,对于不同的Map要字段要定义不同的UDTF。
假设Hive表的一个Map
package me.whitewood.bigdata.hive.udtf;
/**
* Hive UTDF Demo
* Created by Whitewood on 2017/8/12.
*/
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.lazy.LazyMap;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlattenMap extends GenericUDTF{
@Override
public void close() throws HiveException {}
/**
* 初始化函数,用于检验参数类型,并返回UDTF的输出字段名和数据类型
*/
@Override
public StructObjectInspector initialize(StructObjectInspector structObjectInspector)
throws UDFArgumentException {
List<String> fieldNames = new ArrayList<>(4);
List<ObjectInspector> fieldOIs = new ArrayList<>(4);
fieldNames.add("task");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("cash");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
fieldNames.add("online");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
fieldNames.add("gold");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
Object input = args[0];
if(!(input instanceof LazyMap)){
throw new HiveException("flattenMap function should only be applied to map columns");
}
//这里要注意,Hive的Map字段是LazyMap,其中的Key、Value也是LazyString。
//若不手动将其转为String,直接用String类型的Key去Map取值会总是为null。
Map<Object,Object> map = ((LazyMap) input).getMap();
Map<String,String> stringMap = new HashMap<>(4);
for(Map.Entry entry: map.entrySet()){//lazy string
stringMap.put(entry.getKey().toString(), entry.getValue().toString());
}
forward(new Object[]{
stringMap.getOrDefault("task", null),
Integer.parseInt(stringMap.getOrDefault("cash", "0")),
Integer.parseInt(stringMap.getOrDefault("gold", "0")),
Integer.parseInt(stringMap.getOrDefault("online", "0")),
});
}
}
开发好UDTF之后将其打包为Kylin-UDTF-1.0-SNAPSHOT.jar,放到HDFS某个路径下:hdfs dfs -put /home/whitewood/hive/udtf/Kylin-UDTF-1.0-SNAPSHOT.jar hdfs://masters/user/hive/udf
将它注册到Hive的permenant function中:CREATE FUNCTION flattenMa AS 'me.whitewood.bigdata.hive.udtf.FlattenMap' using jar 'hdfs://masters/user/hive/udf/Kylin-UDTF-1.0-SNAPSHOT.jar';
之后就可以在SQL中使用自定义的UDTF:SELECT flattenMap(taskMap) FROM tbl_task LIMIT 10;
参考文献:
1.【Kylin实战】Hive复杂数据类型与视图